Spark Streaming中Kafka两种接收方式

注意:Kakfa 0.8版本支持在2.3版本中标记为弃用。

1 接收器方式

这种方式通过执行器上的接收器从Kafka接收数据。详见receiver reliability

默认配置存在故障时丢失数据的风险,需要使用预写日志保存接收的数据。详见Deploying section

(1) 连接

1
2
3
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.3.0

(2) 编程

1
2
3
4
import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

也可以使用键值类和解码器创建,详见API docs

注意:

  • 这种方式下主题分区与RDD分区不相关。增加接收的主题分区只是增加接收的线程数。
  • 可以使用多个接收器
  • 如果使用了预写日志, 数据已经和日志一同副本。因此可以设置存储等级为StorageLevel.MEMORY_AND_DISK_SER

(3) 部署

  • 打包spark-streaming-kafka-0-8_2.11及其依赖
  • 为spark-core_2.11和spark-streaming_2.11声明provided

2 直接方式

直接方式确保更强的端到端保证,周期性查询每个分区的最新偏移,并根据定义的偏移范围批量处理。

相较接收器方式,提供了以下便利:

  • 简化并行机制

    无需创建多个接收器并合并,自动实现Kafka分区和RDD分区一对一的映射。

  • 存储效率

    接收器方式对同一条记录实现了两次副本:Kakfa自身和预写日志,造成了效率上的损失。而直接方式直接利用Kafka的副本实现,无需预写日志。

  • 刚好一次

    接收器方式中偏移保存在Zookeeper中,存在因故障导致Spark Streaming和Zookeeper数据不一致而多次消费的情况。直接方式将偏移保存在检查点中,避免了不一致情形的发生,只需要保证输出的幂等或原子性事务。

注意:直接方式不会更新ZooKeeper中的偏移,因此基于ZooKeeper的监控工具不能显示进度,可以自行获取并更新偏移。

(1) 连接

1
2
3
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.3.0

(2) 编程

1
2
3
4
5
import org.apache.spark.streaming.kafka._

val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])

也可以通过messageHandler创建以操作MessageAndMetadata。详见API docs.

注意:默认从最近的偏移消费,可以设置auto.offset.reset为smallest从最远的偏移消费。

可以使用以下方式访问偏移:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array.empty[OffsetRange]

directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}

注意:

  • HasOffsetRanges类型捕获仅对directKafkaStream的第一个方法调用有效
  • 分区映射关系仅在shuffle或repartition前有效
  • spark.streaming.receiver. 配置仅对接收器方式有效,spark.streaming.kafka.对直接方式有效

(3) 部署

同上

参考资料

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher)